// Copyright Epic Games, Inc. All Rights Reserved.

#include "jupiter.h"

#include "diag/logging.h"

#include <zencore/compactbinary.h>
#include <zencore/compositebuffer.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/scopeguard.h>
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zenhttp/formatters.h>
#include <zenutil/basicfile.h>

ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
ZEN_THIRD_PARTY_INCLUDES_END

#if ZEN_PLATFORM_WINDOWS
#	pragma comment(lib, "Crypt32.lib")
#	pragma comment(lib, "Wldap32.lib")
#endif

#include <json11.hpp>

using namespace std::literals;

namespace zen {

namespace detail {
	CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
	{
		if (Response.Error)
		{
			return {.SentBytes		= gsl::narrow<uint64_t>(Response.UploadedBytes),
					.ReceivedBytes	= gsl::narrow<uint64_t>(Response.DownloadedBytes),
					.ElapsedSeconds = Response.ElapsedSeconds,
					.ErrorCode		= Response.Error.value().ErrorCode,
					.Reason			= Response.ErrorMessage(ErrorPrefix),
					.Success		= false};
		}
		if (!Response.IsSuccess())
		{
			return {.SentBytes		= gsl::narrow<uint64_t>(Response.UploadedBytes),
					.ReceivedBytes	= gsl::narrow<uint64_t>(Response.DownloadedBytes),
					.ElapsedSeconds = Response.ElapsedSeconds,
					.ErrorCode		= static_cast<int32_t>(Response.StatusCode),
					.Reason			= Response.ErrorMessage(ErrorPrefix),
					.Success		= false};
		}
		return {.Response		= Response.ResponsePayload,
				.SentBytes		= gsl::narrow<uint64_t>(Response.UploadedBytes),
				.ReceivedBytes	= gsl::narrow<uint64_t>(Response.DownloadedBytes),
				.ElapsedSeconds = Response.ElapsedSeconds,
				.ErrorCode		= 0,
				.Success		= true};
	}
}  // namespace detail

CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
{
}

CloudCacheSession::~CloudCacheSession()
{
}

CloudCacheResult
CloudCacheSession::Authenticate()
{
	bool OK = m_CacheClient->m_HttpClient.Authenticate();
	return {.Success = OK};
}

CloudCacheResult
CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
{
	ZEN_TRACE_CPU("JupiterClient::GetRef");

	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
										{HttpClient::Accept(RefType)});

	return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
}

CloudCacheResult
CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
{
	ZEN_TRACE_CPU("JupiterClient::GetBlob");
	HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()),
																	{HttpClient::Accept(ZenContentType::kBinary)});

	return detail::ConvertResponse(Response);
}

CloudCacheResult
CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
{
	ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");

	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
											 TempFolderPath,
											 {HttpClient::Accept(ZenContentType::kCompressedBinary)});

	return detail::ConvertResponse(Response);
}

CloudCacheResult
CloudCacheSession::GetInlineBlob(std::string_view	   Namespace,
								 std::string_view	   BucketId,
								 const IoHash&		   Key,
								 IoHash&			   OutPayloadHash,
								 std::filesystem::path TempFolderPath)
{
	ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");

	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
											 TempFolderPath,
											 {{"Accept", "application/x-jupiter-inline"}});

	CloudCacheResult Result = detail::ConvertResponse(Response);

	if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
	{
		const std::string& PayloadHashHeader = It->second;
		if (PayloadHashHeader.length() == IoHash::StringLength)
		{
			OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
		}
	}

	return Result;
}

CloudCacheResult
CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
{
	ZEN_TRACE_CPU("JupiterClient::GetObject");

	HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
																	{HttpClient::Accept(ZenContentType::kCbObject)});

	return detail::ConvertResponse(Response);
}

PutRefResult
CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
	ZEN_TRACE_CPU("JupiterClient::PutRef");

	Ref.SetContentType(RefType);

	IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());

	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
										Ref,
										{{"X-Jupiter-IoHash", Hash.ToHexString()}});

	PutRefResult Result = {detail::ConvertResponse(Response)};
	if (Result.Success)
	{
		std::string	 JsonError;
		json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
		if (JsonError.empty())
		{
			json11::Json::array Needs = Json["needs"].array_items();
			for (const auto& Need : Needs)
			{
				Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
			}
		}
		Result.RawHash = Hash;
	}
	return Result;
}

FinalizeRefResult
CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
{
	ZEN_TRACE_CPU("JupiterClient::FinalizeRef");

	HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
		fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
		{{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});

	FinalizeRefResult Result = {detail::ConvertResponse(Response)};

	if (Result.Success)
	{
		std::string	 JsonError;
		json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
		if (JsonError.empty())
		{
			json11::Json::array Needs = Json["needs"].array_items();
			for (const auto& Need : Needs)
			{
				Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
			}
		}
	}
	return Result;
}

CloudCacheResult
CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
{
	ZEN_TRACE_CPU("JupiterClient::PutBlob");

	HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);

	return detail::ConvertResponse(Response);
}

CloudCacheResult
CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
{
	ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");

	Blob.SetContentType(ZenContentType::kCompressedBinary);
	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);

	return detail::ConvertResponse(Response);
}

CloudCacheResult
CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
{
	ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");

	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
										   Payload,
										   ZenContentType::kCompressedBinary);

	return detail::ConvertResponse(Response);
}

CloudCacheResult
CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
{
	ZEN_TRACE_CPU("JupiterClient::PutObject");

	Object.SetContentType(ZenContentType::kCbObject);
	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);

	return detail::ConvertResponse(Response);
}

CloudCacheResult
CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
{
	ZEN_TRACE_CPU("JupiterClient::RefExists");

	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));

	return detail::ConvertResponse(Response);
}

GetObjectReferencesResult
CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
{
	ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");

	HttpClient::Response Response =
		m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
										 {HttpClient::Accept(ZenContentType::kCbObject)});

	GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};

	if (Result.Success)
	{
		const CbObject ReferencesResponse = Response.AsObject();
		for (auto& Item : ReferencesResponse["references"sv])
		{
			Result.References.insert(Item.AsHash());
		}
	}
	return Result;
}

CloudCacheResult
CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key)
{
	return CacheTypeExists(Namespace, "blobs"sv, Key);
}

CloudCacheResult
CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
{
	return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
}

CloudCacheResult
CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
{
	return CacheTypeExists(Namespace, "objects"sv, Key);
}

CloudCacheExistsResult
CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
{
	return CacheTypeExists(Namespace, "blobs"sv, Keys);
}

CloudCacheExistsResult
CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
{
	return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
}

CloudCacheExistsResult
CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
{
	return CacheTypeExists(Namespace, "objects"sv, Keys);
}

std::vector<IoHash>
CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
	//	ExtendableStringBuilder<256> Uri;
	//	Uri << m_CacheClient->ServiceUrl();
	//	Uri << "/api/v1/s/" << Namespace;

	ZEN_UNUSED(Namespace, BucketId, ChunkHashes);

	return {};
}

CloudCacheResult
CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
{
	ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");

	HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));

	return detail::ConvertResponse(Response);
}

CloudCacheExistsResult
CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
{
	ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");

	ExtendableStringBuilder<256> Body;
	Body << "[";
	for (const auto& Key : Keys)
	{
		Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
	}
	Body << "]";
	IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
	Payload.SetContentType(ZenContentType::kJSON);

	HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace),
																	 Payload,
																	 {HttpClient::Accept(ZenContentType::kCbObject)});

	CloudCacheExistsResult Result = {detail::ConvertResponse(Response)};

	if (Result.Success)
	{
		const CbObject ExistsResponse = Response.AsObject();
		for (auto& Item : ExistsResponse["needs"sv])
		{
			Result.Needs.insert(Item.AsHash());
		}
	}
	return Result;
}

/**
 * An access token provider that holds a token that will never change.
 */
class StaticTokenProvider final : public CloudCacheTokenProvider
{
public:
	StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {}

	virtual ~StaticTokenProvider() = default;

	virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; }

private:
	CloudCacheAccessToken m_Token;
};

std::unique_ptr<CloudCacheTokenProvider>
CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token)
{
	return std::make_unique<StaticTokenProvider>(std::move(Token));
}

class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider
{
public:
	OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params)
	{
		m_Url		   = std::string(Params.Url);
		m_ClientId	   = std::string(Params.ClientId);
		m_ClientSecret = std::string(Params.ClientSecret);
	}

	virtual ~OAuthClientCredentialsTokenProvider() = default;

	virtual CloudCacheAccessToken AcquireAccessToken() final override
	{
		using namespace std::chrono;

		std::string Body =
			fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret);

		cpr::Response Response =
			cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)});

		if (Response.error || Response.status_code != 200)
		{
			return {};
		}

		std::string	 JsonError;
		json11::Json Json = json11::Json::parse(Response.text, JsonError);

		if (JsonError.empty() == false)
		{
			return {};
		}

		std::string						 Token			  = Json["access_token"].string_value();
		int64_t							 ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value());
		CloudCacheAccessToken::TimePoint ExpireTime		  = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds);

		return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime};
	}

private:
	std::string m_Url;
	std::string m_ClientId;
	std::string m_ClientSecret;
};

std::unique_ptr<CloudCacheTokenProvider>
CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params)
{
	return std::make_unique<OAuthClientCredentialsTokenProvider>(Params);
}

class CallbackTokenProvider final : public CloudCacheTokenProvider
{
public:
	CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {}

	virtual ~CallbackTokenProvider() = default;

	virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); }

private:
	std::function<CloudCacheAccessToken()> m_Callback;
};

std::unique_ptr<CloudCacheTokenProvider>
CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback)
{
	return std::make_unique<CallbackTokenProvider>(std::move(Callback));
}

static std::optional<std::function<HttpClientAccessToken()>>
GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider)
{
	if (TokenProvider == nullptr)
	{
		return {};
	}
	auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken {
		CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken();
		return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime};
	};
	return ProviderFunc;
}

CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
: m_Log(zen::logging::Get("jupiter"))
, m_DefaultDdcNamespace(Options.DdcNamespace)
, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
, m_ComputeCluster(Options.ComputeCluster)
, m_TokenProvider(std::move(TokenProvider))
, m_HttpClient(Options.ServiceUrl,
			   HttpClientSettings{.ConnectTimeout	   = Options.ConnectTimeout,
								  .Timeout			   = Options.Timeout,
								  .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()),
								  .AssumeHttp2		   = Options.AssumeHttp2,
								  .AllowResume		   = Options.AllowResume,
								  .RetryCount		   = Options.RetryCount})
{
	ZEN_ASSERT(m_TokenProvider.get() != nullptr);
}

CloudCacheClient::~CloudCacheClient()
{
}

}  // namespace zen
